PiBot: Remote Control & Vision

Raspberry Pi car with obstacle detection and camera.
A Project By Fanchen Kong (fk245), Ruiyang Jiang (rj393)  


Demonstration Video


Introduction

This project introduces an autonomous robot car, operated through a Raspberry Pi. The aim is to develop a vehicle that can respond to its environment effectively. The car is controlled remotely through a laptop interface, designed with Pygame. A key feature is the car's camera, which relays a live feed to the laptop, allowing the operator to see from the car's perspective. Additionally, the car is equipped with three ultrasonic sensors located at its left, right, and rear sides. These sensors play a crucial role in detecting obstacles and measuring distances. When an object is approaching the car, the sensors alert the system, and the car autonomously adjusts its path to avoid collisions. This project is a demonstration of combining programming, sensor technology, and robotics to create an intelligent vehicle.


Interface Design

Our project involves creating a user interface with Pygame to control a remote-controlled car. The interface features several key areas:

The car’s operations are managed through three separate socket connections. One socket receives data from the ultrasonic sensors, another for the camera feed, and the third sends motor control commands. The car is controlled using the arrow keys on the laptop.


Server and Hardware

In our setup, three sockets are established to connect the client, our Pygame interface Python file, with the server. These sockets have distinct roles: one transmits ultrasonic sensor data, another handles the camera feed, and the third receives motor control data. On the server side, we utilize threading to manage these data streams simultaneously. This approach ensures that the ultrasonic sensors, camera, and motor controls operate and transfer data concurrently, enabling real-time interaction and control.


Testing

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Vestibulum lorem nulla, consectetur at leo vel, pretium bibendum nisl. Cras blandit quam a enim ultrices, eu convallis enim posuere. Donec eleifend enim sed purus consectetur, vitae cursus lectus varius. Vivamus consectetur felis nec est venenatis posuere. Phasellus vitae aliquet erat. In laoreet lacinia mollis. Quisque iaculis nisl fermentum pharetra lobortis. Donec rhoncus dui sem, ac molestie leo tristique vel. Phasellus in nibh feugiat, fringilla lectus in, elementum magna. Etiam quis dui condimentum, tempus ex in, dapibus est. Cras ut congue augue. Donec ac enim ex. Ut id tristique risus, vel porttitor quam. Sed ultricies enim eu nibh porttitor, vel sodales enim feugiat. Fusce volutpat venenatis magna ac ultrices. Curabitur eget urna ut nulla mattis convallis non eu diam.


Result

Our project achieved most of the initial objectives, with the Raspberry Pi-based robot car functioning effectively in several aspects. The integration of ultrasonic sensors and the camera with the Raspberry Pi was successful, enabling the car to detect nearby objects and relay live visuals to the Pygame interface. The obstacle detection and avoidance mechanism worked as planned, with the car appropriately altering its course when encountering obstacles.

However, we encountered some challenges. Notably, the car's wheels did not precisely operate at the same speed, which affected its movement efficiency. While the car's basic movement controls functioned well under normal conditions, the responsiveness under emergency scenarios required fine-tuning, due to the Arduino library for sensor operation proved less reliable than anticipated, occasionally leading to data transmission issues.

Overall, the project demonstrated the potential of combining simple computer hardware with sophisticated sensor technology to create an interactive, remotely controlled vehicle. Future improvements could focus on enhancing the stability of the sensor data transmission and refining the emergency response mechanism.


Work Distribution

Ruiyang Jiang 

rj393@cornell.edu

Designed pygame interface, ultrasonic sensors, motor control and server code.

Fanchen Kong

fk245@cornell.edu

Designed socket connections, send/receive control commands, video stream.


Parts List

All of the parts were provided in the lab.  


References

Ultrasonic sensor Libraries
Pi Camera Manual
Pygame Document
Pi Distance Keeper

Code Appendix



##################################################################################################################
#Pygame_interface.py (run on laptop)

import pygame
import sys
import random
import time
pygame.init()
import threading
import socket
import keyboard
import io
import cv2
import numpy as np
import struct
from PIL import Image


client_socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host='10.49.59.78' #!!!!!!!!!!!Remember to change the host number!!!!!!
port = 12345


video_port=12346
video_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


motor_port=12347
motor_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


client_socket.connect((host,port))
video_socket.connect((host, video_port))
motor_socket.connect((host, motor_port))


def send_arrow_key(direction,speed=30):
    message=f"ArrowKey:{direction},Speed:{speed}"
    motor_socket.send(message.encode('utf-8'))


# Set up the display
screen_width, screen_height = 600, 800
screen = pygame.display.set_mode((screen_width, screen_height))
pygame.display.set_caption('Raspberry Pi Car Control Interface')


# Define colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
GREEN = (0, 255, 0)
RED = (255, 0, 0)
DARK_GREEN = (0, 200, 0)
LIGHT_GREEN = (144, 238, 144)
GRAY = (200, 200, 200)


# Button dimensions
button_width, button_height = 100, 50


# Define button positions
button_positions = {
    'STOP': (screen_width // 2 - button_width // 2, screen_height-3*button_height),
}


# Function to draw a region with a border
def draw_region_with_border(surface, rect, color, border_thickness=2):
    pygame.draw.rect(surface, color, rect)
    pygame.draw.rect(surface, BLACK, rect, border_thickness)
# Define stop button position and rect
stop_button_position = (screen_width // 2 - button_width // 2, screen_height - 3 * button_height)
stop_button_rect = pygame.Rect(stop_button_position, (button_width, button_height))


# screen size
camera_width, camera_height = screen_width, screen_height*2/4
message_height = 200
# camera region
camera_surface = pygame.Surface((camera_width, camera_height))
# message region
message_surface = pygame.Surface((screen_width, message_height))


# Placeholder function to handle movement (to be implemented with Raspberry Pi GPIO control)
def move_car(direction):
    if direction == 'stop':
        send_arrow_key("STOP")
        print("stop")


# Placeholder function for camera feed 
def display_camera_feed():
    # This function would update the Pygame display with the camera feed
    pass


# Placeholder function for distance warning 
def display_distance_warning(last_update_time):
    current_time = pygame.time.get_ticks()
    if current_time - last_update_time > 1000:  # 1000 milliseconds = 1 second
        # Generate new random distances
        display_distance_warning.distances = {
            'front': random.uniform(1, 10),
            'back': random.uniform(1, 10),
            'left': random.uniform(1, 10),
            'right': random.uniform(1, 10)
        }
        last_update_time = current_time


    # Clear the message surface
    message_surface.fill(WHITE)
    font = pygame.font.SysFont('arial', 36)


    # Display all four distance messages with the appropriate background color
    for direction, distance in display_distance_warning.distances.items():
        text_color = BLACK if distance < 2 else BLACK  # Set text color based on distance
        text_surf = font.render(f"{direction.capitalize()}: {distance:.2f}m", True, text_color, WHITE if distance >= 2 else RED)
        text_rect = text_surf.get_rect(center=text_positions[direction])
        message_surface.blit(text_surf, text_rect)


    # Blit the message surface onto the main screen only once
    screen.blit(message_surface, (0, camera_height))


    return last_update_time


# Initialize the static distances with initial random values
display_distance_warning.distances = {
    'front': random.uniform(1, 10),
    'back': random.uniform(1, 10),
    'left': random.uniform(1, 10),
    'right': random.uniform(1, 10)
}


# Define positions for the text within the message_surface
text_positions = {
    'front': (screen_width // 2, message_height // 4),
    'back': (screen_width // 2, 3 * message_height // 4),
    'left': (screen_width // 4, message_height // 2),
    'right': (3 * screen_width // 4, message_height // 2),
}


right_distances=[0,0,0]
back_distances=[0,0,0]
left_distances=[0,0,0]
emergency_status=0  #1 means turn left, 2 means turn right, 3 means accelerate


# Define a variable at the top level to store the current message
current_warning_message = "All Good"


def update_distances_list(distances_list, new_distance):
    # Insert the new distance at the beginning of the list
    distances_list.insert(0, new_distance)
    # Remove the oldest distance to maintain only the last five readings
    distances_list.pop()


def process_distances(distances):
    global right_distances
    global back_distances
    global left_distances
    global emergency_status
    global current_warning_message


    # Update each list with the new sensor readings
    update_distances_list(right_distances, distances[0])
    update_distances_list(back_distances, distances[2])
    update_distances_list(left_distances, distances[1])


    # if the right ultrosonic sensor detects there is something coming and the distance is less
    # than 10cm, and the left distance is larger than 15cm, then move the car left
    if ((right_distances[0] - right_distances[2] < -15) and (right_distances[0] < 10)):
        if left_distances[0] >20:
            send_arrow_key("LEFT",80)
            emergency_status=1
            current_warning_message = "Watch out your right!!"
        elif (back_distances[0] - back_distances[2] < -15) and (back_distances[0] < 10):
            send_arrow_key("LEFT",80)
            emergency_status=1
            current_warning_message = "Watch out your right!!"
        else:
            send_arrow_key("UP",80)
       
    if (left_distances[0] - left_distances[2] < -15) and (left_distances[0] < 10):
        if (right_distances[0]>20):
            send_arrow_key("RIGHT",80)
            emergency_status=2
            current_warning_message = "Watch out your left!!"
        elif (back_distances[0] - back_distances[2] < -15) and (back_distances[0] < 10):
            send_arrow_key("RIGHT",80)
            emergency_status=2
            current_warning_message = "Watch out your left!!"
        else:
            send_arrow_key("UP",80)
   
       
    if ((back_distances[0] - back_distances[2] < -15) and (back_distances[0] < 10)) or ((left_distances[0]<10) and (right_distances[0]<10)):
        send_arrow_key("UP",80)
        emergency_status=3
        current_warning_message = "Watch out your back!!"
    if (emergency_status==1) and right_distances[0]>15:
        send_arrow_key("UP")
        emergency_status=0
        current_warning_message = "All Good"
    if (emergency_status==2) and left_distances[0]>15:
        send_arrow_key("UP")
        emergency_status=0
        current_warning_message = "All Good"
    if (emergency_status==3) and back_distances[0]>15:
        send_arrow_key("UP")
        emergency_status=0
        current_warning_message = "All Good"
   


def update_distances():
    global distances  # Use a global variable to store distances
    while True:
        client_socket.settimeout(100)  # Set a short timeout for non-blocking receive
        try:
            received_data = client_socket.recv(1024).decode('utf-8')
            if received_data:
                new_distances = [float(dist) for dist in received_data.split(',')]
                print("Received distances:", new_distances)  # Print the distances in the terminal


                # Update only if the new distances are non-zero
                if all(dist > 0 for dist in new_distances):
                    distances = new_distances
                    process_distances(distances)  # Process the distances and send commands
        except BlockingIOError:
            # No data received, non-blocking mode error
            pass
        except Exception as e:
            print(f"Error receiving distance data: {e}")


# Function to receive and display the video stream
def receive_video_stream():
    # Make a file-like object out of the connection
    connection = video_socket.makefile('rb')


    try:
        # Create an OpenCV window
        cv2.namedWindow('Video Stream', cv2.WINDOW_NORMAL)


        while True:
            # Read the length of the image as a 32-bit unsigned int
           
            image_len = struct.unpack('<L', connection.read(struct.calcsize('<L')))[0]
            if not image_len:
                print("1111111111")
                receive_video_stream()


            # Construct a stream to hold the image data and read the image data from the connection
            image_stream = io.BytesIO(connection.read(image_len))


            # Rewind the stream and open it as an image with PIL
            image = Image.open(image_stream)


            # Convert PIL image to a NumPy array
            frame = np.array(image)


            # Convert RGB to BGR (OpenCV uses BGR format)
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)


            # Display the video stream
            cv2.imshow('Video Stream', frame)



    finally:
        # Close the video socket
        video_socket.close()


# Function to draw buttons
def draw_button(surface, text, position, color, hover_color):
    rect = pygame.Rect(position, (button_width, button_height))
    pygame.draw.rect(surface, color, rect)


    # change the button color when the mouse is on the button
    mouse_pos = pygame.mouse.get_pos()
    if rect.collidepoint(mouse_pos):
        pygame.draw.rect(surface, hover_color, rect)
   
    font = pygame.font.SysFont(None, 24)
    text_surf = font.render(text, True, BLACK)
    text_rect = text_surf.get_rect(center=rect.center)
    surface.blit(text_surf, text_rect)


# Start the distance update thread
distance_thread = threading.Thread(target=update_distances)
distance_thread.daemon = True  # This ensures the thread will exit when the main program exits
distance_thread.start()


# Start a thread to receive and display the video stream
video_thread = threading.Thread(target=receive_video_stream)
video_thread.daemon = True
video_thread.start()


# Main loop
running = True
distances=[0,0,0]
while running:
    # Check for pygame events
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False
        elif event.type == pygame.KEYDOWN:
            if event.key == pygame.K_UP:
                send_arrow_key("UP")
                print("up")
            elif event.key == pygame.K_DOWN:
                send_arrow_key("DOWN")
            elif event.key == pygame.K_LEFT:
                send_arrow_key("LEFT",50)
            elif event.key == pygame.K_RIGHT:
                send_arrow_key("RIGHT")
        elif event.type == pygame.MOUSEBUTTONDOWN:
            if stop_button_rect.collidepoint(event.pos):
                move_car('stop')
        else:
            #distances = update_distances()
            pass


   
    # Fill the background
    screen.fill(WHITE)
    # Clear the camera surface
    camera_surface.fill(BLACK)  # Use a background color that suits your design


    # Draw the warning message if it exists
    if current_warning_message:
        font = pygame.font.SysFont('arial', 36)
        message_surf = font.render(current_warning_message, True, RED)
        message_rect = message_surf.get_rect(center=(camera_width // 2, camera_height // 2))


        # Blit the text onto the camera_surface
        camera_surface.blit(message_surf, message_rect)


    # Blit the camera_surface onto the main screen
    screen.blit(camera_surface, (0, 0))
   


    # Display the distances received from the server
    message_surface.fill(WHITE)
    font = pygame.font.SysFont('arial', 36)
    directions = ['right', 'left', 'back']  # Assuming these correspond to sonar1 and sonar4
    for i, direction in enumerate(directions):
        color = RED if distances[i] < 10 else WHITE
        text_surf = font.render(f"{direction.capitalize()}: {distances[i]:.2f}cm", True, BLACK,color)
        text_rect = text_surf.get_rect(center=text_positions[direction])
        message_surface.blit(text_surf, text_rect)
    screen.blit(message_surface, (0, camera_height))
    # Draw the buttons
   
    buttons_rect = pygame.Rect(0, camera_height + message_height, screen_width, screen_height - camera_height - message_height)
    draw_region_with_border(screen, buttons_rect, GRAY)
    for direction, position in button_positions.items():
        draw_button(screen, direction.capitalize(), position, GREEN, LIGHT_GREEN)


    # Display the camera feed (placeholder)
    display_camera_feed()


    # Update the display
    pygame.display.flip()


pygame.quit()
client_socket.close()
video_socket.close()
motor_socket.close()

##################################################################################################################
#Thread_server.py (run on Raspberry Pi)

import socket
import time
import threading
import board
import adafruit_hcsr04
import RPi.GPIO as GPIO
import io
import picamera
import struct


GPIO.setmode(GPIO.BCM)
#motor control


motor1A=5
motor1B=19
motor2A=6
motor2B=26
quit_button_pin =23


GPIO.setup(quit_button_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(13,GPIO.OUT)
GPIO.setup(24,GPIO.OUT)
GPIO.setup(5,GPIO.OUT)
GPIO.setup(19,GPIO.OUT)
GPIO.setup(6,GPIO.OUT)
GPIO.setup(26,GPIO.OUT)
trigger1=16
echo1=12
trigger3=27
echo3=22
trigger2=25
echo2=17
GPIO.setup(trigger1,GPIO.OUT)
GPIO.setup(echo1,GPIO.IN)
GPIO.setup(trigger2,GPIO.OUT)
GPIO.setup(echo2,GPIO.IN)
GPIO.setup(trigger3,GPIO.OUT)
GPIO.setup(echo3,GPIO.IN)


def getDistance(trigger,echo):
    GPIO.output(trigger,True)
    time.sleep(0.001)
    GPIO.output(trigger,False)
    starttime=time.time()
    stoptime=time.time()


    while GPIO.input(echo)==0:
        starttime=time.time()
    while GPIO.input(echo)==1:
        stoptime=time.time()


    timeElapsed=stoptime-starttime
    distance=(timeElapsed*13503.94)/2
    return distance


#setup PWM
pwm_frequency=100
duty_circle=0
pwm13=GPIO.PWM(13,pwm_frequency)
pwm13.start(duty_circle)
pwm24=GPIO.PWM(24,pwm_frequency)
pwm24.start(duty_circle)


# Function to be called when the quit button is pressed
def quit_program(channel):
    print("Quit button pressed. Exiting program.")
    client_socket.close()
    server_socket.close()
    video_server_socket.close()
    pwm13.ChangeDutyCycle(0)
    pwm13.stop()
    pwm24.ChangeDutyCycle(0)
    pwm24.stop()
    GPIO.cleanup()
    exit()


# Add a callback for the quit button press event
GPIO.add_event_detect(quit_button_pin, GPIO.FALLING, callback=quit_program, bouncetime=300)


#motor control function
def move_forward(speed):
    print("move forward")
    pwm24.ChangeDutyCycle(speed)
    pwm13.ChangeDutyCycle(speed)
    GPIO.output(5,GPIO.LOW)
    GPIO.output(6,GPIO.HIGH)
    GPIO.output(19,GPIO.HIGH)
    GPIO.output(26,GPIO.LOW)


def move_backward(speed):
    print("move backward")
    pwm24.ChangeDutyCycle(speed)
    pwm24.ChangeDutyCycle(speed)
    GPIO.output(5,GPIO.HIGH)
    GPIO.output(6,GPIO.LOW)
    GPIO.output(19,GPIO.LOW)
    GPIO.output(26,GPIO.HIGH)


def turn_right(speed):
    print("turn right")
    pwm13.ChangeDutyCycle(speed)
    pwm24.ChangeDutyCycle(0)
    GPIO.output(19,GPIO.HIGH)
    GPIO.output(26,GPIO.LOW)


def turn_left(speed):
    print("turn left")
    pwm24.ChangeDutyCycle(speed)
    pwm13.ChangeDutyCycle(0)
    GPIO.output(5,GPIO.LOW)
    GPIO.output(6,GPIO.HIGH)


def stop():
    pwm13.ChangeDutyCycle(0)
    pwm24.ChangeDutyCycle(0)


def motor_control_thread(motor_socket):
    while True:
        data = motor_socket.recv(1024).decode('utf-8')
        if data:
            commands = data.split(',')
            direction_command=commands[0]
            speed_command = int(commands[1].split(':')[1])
            print(direction_command)
            print(speed_command)
            if direction_command == "ArrowKey:UP":
                move_forward(speed_command)
            elif direction_command == "ArrowKey:DOWN":
                move_backward(speed_command)
            elif direction_command == "ArrowKey:LEFT":
                turn_left(speed_command)
            elif direction_command == "ArrowKey:RIGHT":
                turn_right(speed_command)
            elif direction_command == "ArrowKey:STOP":
                stop()
            print(f"Motor Command Received: {data}")
            time.sleep(0.1)




sensor_data = {"distance1":0.0,"distance2":0.0,"distance3":0.0}
def handle_client(client_socket):
    client_socket.settimeout(0.5)
    while True:
        try:
            distances=f"{sensor_data['distance1']:.2f},{sensor_data['distance2']:.2f},{sensor_data['distance3']:.2f}"
            client_socket.send(distances.encode('utf-8'))
            try:
                data=client_socket.recv(1024)
                if data:
                    print(f"Received:{data.decode('utf-8')}")
            except socket.timeout:
                pass
            time.sleep(0.5)
        except Exception as e:
            print(f"Ultrosonic Error: {e}")
           
def read_sensor(sensor):
    try:
        return sensor.distance
    except RuntimeError:
        print("Ultrosonic Error reading ",sensor)
        time.sleep(0.05)




def read_sensors():   
    while True:
        distance1=getDistance(trigger1,echo1)
        sensor_data['distance1']=distance1
        if distance1 is not None:
            print(f"Distance 1: {distance1} cm")


        distance2=getDistance(trigger2,echo2)
        sensor_data['distance2']=distance2
        if distance2 is not None:
            print(f"Distance 2: {distance2} cm")


        distance3=getDistance(trigger3,echo3)
        sensor_data['distance3']=distance3
        if distance3 is not None:
            print(f"Distance 3: {distance3} cm")
        time.sleep(0.05)


       
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
video_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
motor_control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = '0.0.0.0'
port=12345
video_port=12346
motor_control_port=12347


server_socket.bind((host,port))
server_socket.listen(1)
video_server_socket.bind((host,video_port))
video_server_socket.listen(1)
motor_control_socket.bind((host,motor_control_port))
motor_control_socket.listen(1)




print(f"Server listening on {host}:{port}")
print(f"Video server listeningg on {host}:{video_port}")


client_socket, client_address=server_socket.accept()
print(f"Connection from {client_address}")
motor_client_socket,addr=motor_control_socket.accept()
print(f"Motor control connection from {addr}")
# Video streaming function
def send_video_stream(client_connection):


    connection = client_connection.makefile('wb')
   
    with picamera.PiCamera() as camera:
        camera.resolution = (640, 480)
        camera.framerate = 24
        camera.sharpness = 10
        camera.contrast = 10
        camera.brightness = 50
        camera.ISO = 100
        camera.exposure_compensation = 0
        camera.exposure_mode = 'auto'
        camera.awb_mode='auto'


        # Wait for the camera to warm up
        time.sleep(2)


        stream = io.BytesIO()
        for _ in camera.capture_continuous(stream, 'jpeg', use_video_port=True):
            connection.write(struct.pack('<L', stream.tell()))        
            connection.flush()
            stream.seek(0)
            connection.write(stream.read())
            stream.seek(0)
            stream.truncate()


# Function to start the video stream in a separate thread
def start_video_stream():
    video_client_socket, addr = video_server_socket.accept()
    print(f"Video connection from {addr}")
    video_thread = threading.Thread(target=send_video_stream, args=(video_client_socket,))
    video_thread.start()


# Start the video stream automatically


video_stream_thread=threading.Thread(target=start_video_stream)
video_stream_thread.daemon=True
video_stream_thread.start()


#start the motor control thread
motor_thread=threading.Thread(target=motor_control_thread,args=(motor_client_socket,))
motor_thread.daemon=True
motor_thread.start()


sensor_thread=threading.Thread(target=read_sensors)
sensor_thread.daemon=True #this ensures the thread will exit when the main program exits
sensor_thread.start()






handle_client(client_socket)


client_socket.close()
server_socket.close()
video_server_socket.close()
pwm13.ChangeDutyCycle(0)
pwm13.stop()
pwm24.ChangeDutyCycle(0)
pwm24.stop()
GPIO.cleanup()